package cn.zhaopin.starter.mq.config;

import cn.zhaopin.starter.mq.interceptor.PulsarConsumerInterceptor;
import cn.zhaopin.starter.mq.annotation.PulsarMessageListener;
import cn.zhaopin.starter.mq.common.PulsarMessage;
import cn.zhaopin.starter.mq.common.PulsarMessageExt;
import cn.zhaopin.starter.mq.core.PulsarConsumerFactory;
import cn.zhaopin.starter.mq.core.PulsarListener;
import cn.zhaopin.starter.mq.support.JSON2Schema;
import cn.zhaopin.starter.mq.support.PulsarJackson2MessageConverter;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;

import javax.ws.rs.NotSupportedException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Objects;

/**
 * Description: default listener container
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/21-14:05
 */
@SuppressWarnings({"WeakerAccess", "unchecked", "ConstantConditions", "NullableProblems", "rawtypes", "RedundantExplicitVariableType"})
public class DefaultPulsarListenerContainer implements InitializingBean,
        DisposableBean, SmartLifecycle, ApplicationContextAware {

    private final static Logger log = LoggerFactory.getLogger(DefaultPulsarListenerContainer.class);

    private ApplicationContext applicationContext;

    /**
     * The name of the DefaultPulsarListenerContainer instance
     */
    private String name;

    private boolean running;

    private static final String CHARSET = "UTF-8";

    private Consumer<PulsarMessage> consumer;
    private ConsumerBuilder<PulsarMessage> consumerBuilder;
    private PulsarClient client;

    private PulsarConsumerFactory consumerFactory;

    private MessageConverter messageConverter;

    private PulsarMessageListener messageListener;
    private PulsarListener pulsarListener;

    /** properties from @PulsarMessageListener */
    private String topic;
    private String subscriptionName;
    private SubscriptionType subscriptionType;
    private SubscriptionInitialPosition subscriptionInitialPosition;

    /** payload type defined */
    private Type payloadType;

    @Override
    public void destroy() throws Exception {
        this.setRunning(false);
        if(Objects.nonNull(consumer) && consumer.isConnected()) {
            consumer.close();
        }
        log.info("container destroyed, {}", this.toString());
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        this.payloadType = this.getType();

        initMessageConverter();

        initPulsarConsumer();

    }

    private void initMessageConverter() {
        PulsarJackson2MessageConverter jackson2MessageConverter = new PulsarJackson2MessageConverter();
        jackson2MessageConverter.setObjectMapper(JSON2Schema.getObjectMapper());
        jackson2MessageConverter.setSerializedPayloadClass(String.class);
        messageConverter = jackson2MessageConverter;
    }

    private void initPulsarConsumer() throws PulsarClientException {
        consumerBuilder = consumerFactory.createBuilder(topic)
                .subscriptionName(subscriptionName)
                .subscriptionInitialPosition(subscriptionInitialPosition)
                .subscriptionType(subscriptionType)
                //.enableRetry(true)
                ;

        // consumerBuilder.intercept(new PulsarConsumerInterceptor());
        consumerBuilder.messageListener(new DefaultMessageListener());
    }

    private class DefaultMessageListener implements MessageListener<PulsarMessage> {

        @Override
        public void received(Consumer<PulsarMessage> consumer, Message<PulsarMessage> message) {
            // listener
            boolean success = false;
            try {
                success = pulsarListener.onMessage(doConvertMessage(message));
            } catch (Exception e) {
                e.printStackTrace();
            }

            // ack message
            ack(consumer, message, success);
        }

        @Override
        public void reachedEndOfTopic(Consumer<PulsarMessage> consumer) {
            System.out.println("reachedEndOfTopic");
        }

        private void ack(Consumer<PulsarMessage> consumer, Message<PulsarMessage> message, boolean success) {
            if (success) {
                try {
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    log.error("Pulsar consumer ack fail");
                    e.printStackTrace();
                }
                return;
            }
            consumer.negativeAcknowledge(message);
            log.warn("Pulsar consumer topic: {} ,Unconfirmed success, msg: {}", message.getTopicName(), message.getValue().getPayload());
        }
    }

    public PulsarMessageExt doConvertMessage(Message<PulsarMessage> message) {
        PulsarMessage value = message.getValue();

        PulsarMessageExt messageExt = new PulsarMessageExt();

        Object payloadObj = value.getPayload();
        if (Objects.equals(payloadType, String.class)) {
            messageExt.setBody(payloadObj.toString());
        } else if (Objects.equals(payloadType, byte[].class)) {
            try {
                messageExt.setBody(payloadObj.toString().getBytes(CHARSET));
            } catch (UnsupportedEncodingException e) {
                log.info("convert failed. payloadObj:{}, payloadType:{}", payloadObj, payloadType);
                throw new RuntimeException("cannot convert message to " + payloadType, e);
            }
        }
        else {
            try {
                if (payloadType instanceof Class) {
                    messageExt.setBody(this.getMessageConverter().fromMessage(MessageBuilder.withPayload(payloadObj).build(), (Class<?>) payloadType));
                } else {
                    throw new NotSupportedException("Not Support payload class type");
                }
            } catch (Exception e) {
                log.info("convert failed. payloadObj:{}, payloadType:{}", payloadObj, payloadType);
                throw new RuntimeException("cannot convert message to " + payloadType, e);
            }
        }
        messageExt.setTopic(message.getTopicName());
        messageExt.setMessageId(message.getMessageId());
        messageExt.setHeaders(value.getHeaders());
        return messageExt;
    }

    private Type getType() {
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(pulsarListener);
        Type matchedGenericInterface = null;
        while (Objects.nonNull(targetClass)) {
            Type[] interfaces = targetClass.getGenericInterfaces();
            if (Objects.nonNull(interfaces)) {
                for (Type type : interfaces) {
                    if (type instanceof ParameterizedType &&
                            (Objects.equals(((ParameterizedType) type).getRawType(), PulsarListener.class))) {
                        matchedGenericInterface = type;
                        break;
                    }
                }
            }
            targetClass = targetClass.getSuperclass();
        }
        if (Objects.isNull(matchedGenericInterface)) {
            return Object.class;
        }

        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
            return actualTypeArguments[0];
        }
        return Object.class;
    }

    @Override
    public int getPhase() {
        // Returning Integer.MAX_VALUE only suggests that
        // we will be the first bean to shutdown and last bean to start
        return Integer.MAX_VALUE;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }
        try {
            consumer = consumerBuilder.subscribe();
            setRunning(true);
        } catch (PulsarClientException e) {
            e.printStackTrace();
            log.error("consumer start error topic : {}", topic, e);
        }
        log.info("running container: {}", this.toString());
    }

    @Override
    public void stop() {
        if(this.isRunning() && Objects.nonNull(consumer) && consumer.isConnected()) {
            try {
                consumer.close();
                this.setRunning(false);
            } catch (PulsarClientException e) {
                log.error("consumer close error, topic: {}", topic, e);
                e.printStackTrace();
            }
        }
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setClient(PulsarClient client) {
        this.client = client;
    }

    public void setMessageListener(PulsarMessageListener annotation) {
        this.messageListener = annotation;
        this.topic = annotation.topic();
        this.subscriptionName = annotation.subscriptionName();
        this.subscriptionType = annotation.subscriptionType();
        this.subscriptionInitialPosition = annotation.subscriptionInitialPosition();
    }

    public void setConsumerFactory(PulsarConsumerFactory consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public void setPulsarListener(PulsarListener listener) {
        this.pulsarListener = listener;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public MessageConverter getMessageConverter() {
        return messageConverter;
    }

    @Override
    public String toString() {
        return "DefaultPulsarListenerContainer{" +
                ", name='" + name + '\'' +
                ", running=" + running +
                ", consumer=" + consumer +
                ", topic='" + topic + '\'' +
                ", subscriptionName='" + subscriptionName + '\'' +
                ", subscriptionType=" + subscriptionType +
                '}';
    }

}
